// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
	"bytes"
	"context"
	"fmt"
	"math"
	"strings"
	"time"

	"go.chromium.org/luci/auth/identity"
	"go.chromium.org/luci/common/clock"
	"go.chromium.org/luci/common/data/rand/mathrand"
	"go.chromium.org/luci/common/errors"
	"go.chromium.org/luci/common/logging"
	"go.chromium.org/luci/common/retry/transient"
	"go.chromium.org/luci/common/tsmon/distribution"
	"go.chromium.org/luci/common/tsmon/field"
	"go.chromium.org/luci/common/tsmon/metric"
	"go.chromium.org/luci/common/tsmon/types"
	"go.chromium.org/luci/gae/service/datastore"

	"go.chromium.org/luci/scheduler/appengine/internal"
	"go.chromium.org/luci/scheduler/appengine/task"
)

// errInvocationIDConflict is returned by generateInvocationID.
var errInvocationIDConflict = errors.New("could not find available invocationID", transient.Tag)

const (
	// debugLogSizeLimit is how many bytes the invocation debug log can be before
	// it gets trimmed. See 'trimDebugLog'. The debug log isn't supposed to be
	// huge.
	debugLogSizeLimit = 200000

	// debugLogTailLines is how many last log lines to keep when trimming the log.
	debugLogTailLines = 100
)

// Jan 1 2015, in UTC.
var invocationIDEpoch time.Time

func init() {
	var err error
	invocationIDEpoch, err = time.Parse(time.RFC822, "01 Jan 15 00:00 UTC")
	if err != nil {
		panic(err)
	}
}

var (
	// distributionBucketerSecToDay 1s to 1 day.
	//
	// 0.05037 is math.log10(math.exp(math.log(24*60*60)/98)), which means
	// the last bucket will contain overflow of everything >=1 day.
	distributionBucketerSecToDay = distribution.GeometricBucketer(math.Pow(10.0, 0.05037), 100)

	metricInvocationsDurations = metric.NewCumulativeDistribution(
		"luci/scheduler/invocations/durations",
		"Durations of completed invocations (sec).",
		&types.MetricMetadata{Units: types.Seconds},
		distributionBucketerSecToDay,
		field.String("jobID"),
		field.String("status"), // one of final statuses of task.Status enum.
	)
)

// generateInvocationID is called within a transaction to pick a new Invocation
// ID and ensure it isn't taken yet.
//
// This function essentially pick root key for a new entity group, checking
// that it hasn't been taken yet.
//
// Format of the invocation ID:
//   - highest order bit set to 0 to keep the value positive.
//   - next 43 bits set to negated time since some predefined epoch, in ms.
//   - next 16 bits are generated by math.Rand
//   - next 4 bits set to 0. They indicate ID format.
//
// Makes one attempt at allocating an ID. If it fails (should be extremely
// rare), the entire transaction should be retried. We do it to avoid
// unnecessarily hitting multiple entity groups from a single transaction.
//
// Returns only transient errors.
func generateInvocationID(c context.Context) (int64, error) {
	// See http://play.golang.org/p/POpQzpT4Up.
	invTs := int64(clock.Now(c).UTC().Sub(invocationIDEpoch) / time.Millisecond)
	invTs = ^invTs & 8796093022207 // 0b111....1, 42 bits (clear highest bit)
	invTs = invTs << 20

	randSuffix := mathrand.Int63n(c, 65536)
	invID := invTs | (randSuffix << 4)
	exists, err := datastore.Exists(c, datastore.NewKey(c, "Invocation", "", invID, nil))
	if err != nil {
		return 0, transient.Tag.Apply(err)
	}
	if !exists.All() {
		return invID, nil
	}

	return 0, errInvocationIDConflict
}

// Invocation entity stores single invocation of a job (with perhaps multiple
// attempts due retries if the invocation fails to start).
//
// Root entity. ID is generated based on time by generateInvocationID()
// function.
type Invocation struct {
	_kind  string                `gae:"$kind,Invocation"`
	_extra datastore.PropertyMap `gae:"-,extra"`

	// ID is identifier of this particular attempt to run a job.
	ID int64 `gae:"$id"`

	// JobID is '<ProjectID>/<JobName>' string of a parent job.
	//
	// Set when the invocation is created and never changes.
	JobID string `gae:",noindex"`

	// IndexedJobID is '<ProjectID>/<JobName>' string of a parent job, but it is
	// set only for finished invocations.
	//
	// It is used to make the invocations appear in the listings of finished
	// invocations.
	//
	// We can't use JobID field for this since the invocation launch procedure can
	// potentially generate orphaned "garbage" invocations in some edge cases (if
	// Invocation transaction lands, but separate Job transaction doesn't). They
	// are harmless, but we don't want them to show up in listings.
	IndexedJobID string

	// RealmID is a global realm name (i.e. "<ProjectID>:...") the invocation
	// belongs to.
	//
	// It is copied from the Job entity when the invocation is created. May be
	// empty for old invocations.
	RealmID string `gae:",noindex"`

	// Started is time when this invocation was created.
	Started time.Time `gae:",noindex"`

	// Finished is time when this invocation transitioned to a terminal state.
	Finished time.Time `gae:",noindex"`

	// TriggeredBy is identity of whoever triggered the invocation, if it was
	// triggered via a single trigger submitted by some external user (not by the
	// service itself).
	//
	// Empty identity string if it was triggered by the service itself.
	TriggeredBy identity.Identity

	// PropertiesRaw is a blob with serialized task.Request.Properties supplied
	// when the invocation was created.
	//
	// Task managers use it to prepare the parameters for tasks.
	PropertiesRaw []byte `gae:",noindex"`

	// Tags is a sorted list of indexed "key:value" pairs supplied via
	// task.Request.Tags when the invocation was created.
	//
	// May be passed down the stack by task managers.
	Tags []string

	// IncomingTriggersRaw is a serialized list of triggers that the invocation
	// consumed.
	//
	// They are popped from job's pending triggers set when the invocation
	// starts.
	//
	// Use IncomingTriggers() function to grab them in deserialized form.
	IncomingTriggersRaw []byte `gae:",noindex"`

	// OutgoingTriggersRaw is a serialized list of triggers that the invocation
	// produced.
	//
	// They are fanned out into pending trigger sets of corresponding triggered
	// jobs (specified by TriggeredJobIDs).
	//
	// Use OutgoingTriggers() function to grab them in deserialized form.
	OutgoingTriggersRaw []byte `gae:",noindex"`

	// PendingTimersRaw is a serialized list of pending invocation timers.
	//
	// Timers are emitted by Controller's AddTimer call.
	//
	// Use PendingTimers() function to grab them in deserialized form.
	PendingTimersRaw []byte `gae:",noindex"`

	// Revision is revision number of config.cfg when this invocation was created.
	// For informational purpose.
	Revision string `gae:",noindex"`

	// RevisionURL is URL to human readable page with config file at
	// an appropriate revision. For informational purpose.
	RevisionURL string `gae:",noindex"`

	// Task is the job payload for this invocation in binary serialized form.
	// For informational purpose. See Catalog.UnmarshalTask().
	Task []byte `gae:",noindex"`

	// TriggeredJobIDs is a list of jobIDs of jobs which this job triggers.
	// The list is sorted and without duplicates.
	TriggeredJobIDs []string `gae:",noindex"`

	// DebugLog is short free form text log with debug messages.
	DebugLog string `gae:",noindex"`

	// RetryCount is 0 on a first attempt to launch the task. Increased with each
	// retry. For informational purposes.
	RetryCount int64 `gae:",noindex"`

	// Status is current status of the invocation (e.g. "RUNNING"), see the enum.
	Status task.Status

	// ViewURL is optional URL to a human readable page with task status, e.g.
	// Swarming task page. Populated by corresponding TaskManager.
	ViewURL string `gae:",noindex"`

	// TaskData is a storage where TaskManager can keep task-specific state
	// between calls.
	TaskData []byte `gae:",noindex"`

	// MutationsCount is used for simple compare-and-swap transaction control.
	//
	// It is incremented on each change to the entity.
	MutationsCount int64 `gae:",noindex"`
}

// isEqual returns true iff 'e' is equal to 'other'
func (e *Invocation) isEqual(other *Invocation) bool {
	return e == other || (e.ID == other.ID &&
		e.MutationsCount == other.MutationsCount && // compare it first, it changes most often
		e.JobID == other.JobID &&
		e.IndexedJobID == other.IndexedJobID &&
		e.Started.Equal(other.Started) &&
		e.Finished.Equal(other.Finished) &&
		e.TriggeredBy == other.TriggeredBy &&
		bytes.Equal(e.PropertiesRaw, other.PropertiesRaw) &&
		equalSortedLists(e.Tags, other.Tags) &&
		bytes.Equal(e.IncomingTriggersRaw, other.IncomingTriggersRaw) &&
		bytes.Equal(e.OutgoingTriggersRaw, other.OutgoingTriggersRaw) &&
		bytes.Equal(e.PendingTimersRaw, other.PendingTimersRaw) &&
		e.Revision == other.Revision &&
		e.RevisionURL == other.RevisionURL &&
		bytes.Equal(e.Task, other.Task) &&
		equalSortedLists(e.TriggeredJobIDs, other.TriggeredJobIDs) &&
		e.DebugLog == other.DebugLog &&
		e.RetryCount == other.RetryCount &&
		e.Status == other.Status &&
		e.ViewURL == other.ViewURL &&
		bytes.Equal(e.TaskData, other.TaskData))
}

// GetProjectID parses the ProjectID from the JobID and returns it.
func (e *Invocation) GetProjectID() string {
	parts := strings.Split(e.JobID, "/")
	return parts[0]
}

// debugLog appends a line to DebugLog field.
func (e *Invocation) debugLog(c context.Context, format string, args ...any) {
	debugLog(c, &e.DebugLog, format, args...)
}

// trimDebugLog makes sure DebugLog field doesn't exceed limits.
//
// It cuts the middle of the log. We need to do this to keep the entity small
// enough to fit the datastore limits.
func (e *Invocation) trimDebugLog() {
	if len(e.DebugLog) <= debugLogSizeLimit {
		return
	}

	const cutMsg = "--- the log has been cut here ---"
	giveUp := func() {
		e.DebugLog = e.DebugLog[:debugLogSizeLimit-len(cutMsg)-2] + "\n" + cutMsg + "\n"
	}

	// We take last debugLogTailLines lines of log and move them "up", so that
	// the total log size is less than debugLogSizeLimit. We then put a line with
	// the message that some log lines have been cut. If these operations are not
	// possible (e.g. we have some giant lines or something), we give up and just
	// cut the end of the log.

	// Find debugLogTailLines-th "\n" from the end, e.DebugLog[tailStart:] is the
	// log tail.
	tailStart := len(e.DebugLog)
	for range debugLogTailLines {
		tailStart = strings.LastIndex(e.DebugLog[:tailStart-1], "\n")
		if tailStart <= 0 {
			giveUp()
			return
		}
	}
	tailStart++

	// Figure out how many bytes of head we can keep to make trimmed log small
	// enough.
	tailLen := len(e.DebugLog) - tailStart + len(cutMsg) + 1
	headSize := debugLogSizeLimit - tailLen
	if headSize <= 0 {
		giveUp()
		return
	}

	// Find last "\n" in the head.
	headEnd := strings.LastIndex(e.DebugLog[:headSize], "\n")
	if headEnd <= 0 {
		giveUp()
		return
	}

	// We want to keep 50 lines of the head no matter what.
	headLines := strings.Count(e.DebugLog[:headEnd], "\n")
	if headLines < 50 {
		giveUp()
		return
	}

	// Remove duplicated 'cutMsg' lines. They may appear if 'debugLog' (followed
	// by 'trimDebugLog') is called on already trimmed log multiple times.
	lines := strings.Split(e.DebugLog[:headEnd], "\n")
	lines = append(lines, cutMsg)
	lines = append(lines, strings.Split(e.DebugLog[tailStart:], "\n")...)
	trimmed := make([]byte, 0, debugLogSizeLimit)
	trimmed = append(trimmed, lines[0]...)
	for i := 1; i < len(lines); i++ {
		if !(lines[i-1] == cutMsg && lines[i] == cutMsg) {
			trimmed = append(trimmed, '\n')
			trimmed = append(trimmed, lines[i]...)
		}
	}
	e.DebugLog = string(trimmed)
}

// IncomingTriggers is a list of triggers that the invocation consumed.
//
// It is deserialized on the fly from IncomingTriggersRaw.
func (e *Invocation) IncomingTriggers() ([]*internal.Trigger, error) {
	return unmarshalTriggersList(e.IncomingTriggersRaw)
}

// OutgoingTriggers is a list of triggers that the invocation produced.
//
// It is deserialized on the fly from OutgoingTriggersRaw.
func (e *Invocation) OutgoingTriggers() ([]*internal.Trigger, error) {
	return unmarshalTriggersList(e.OutgoingTriggersRaw)
}

// PendingTimers is a list of not-yet-consumed invocation timers.
//
// It is deserialized on the fly from PendingTimersRaw.
func (e *Invocation) PendingTimers() ([]*internal.Timer, error) {
	return unmarshalTimersList(e.PendingTimersRaw)
}

// cleanupUnreferencedInvocations tries to delete given invocations.
//
// This is best effort cleanup after failures. It logs errors, but doesn't
// return them, to indicate that there's nothing we can actually do.
//
// 'invs' is allowed to have nils, they are skipped. Allowed to be called
// within a transaction, ignores it.
func cleanupUnreferencedInvocations(c context.Context, invs []*Invocation) {
	keysToKill := make([]*datastore.Key, 0, len(invs))
	for _, inv := range invs {
		if inv != nil {
			logging.Warningf(c, "Cleaning up inv %d of job %q", inv.ID, inv.JobID)
			keysToKill = append(keysToKill, datastore.KeyForObj(c, inv))
		}
	}
	if err := datastore.Delete(datastore.WithoutTransaction(c), keysToKill); err != nil {
		logging.WithError(err).Warningf(c, "Invocation cleanup failed")
	}
}

// reportCompletionMetrics reports invocation stats to monitoring.
// Should be called after transaction to save this invocation is completed.
func (e *Invocation) reportCompletionMetrics(c context.Context) {
	if !e.Status.Final() || e.Finished.IsZero() {
		panic(fmt.Errorf("reportCompletionMetrics on incomplete invocation: %v", e))
	}
	duration := e.Finished.Sub(e.Started)
	metricInvocationsDurations.Add(c, duration.Seconds(), e.JobID, string(e.Status))
}
